package net.quanter.shield.mq.rocketmq.producer;

import com.alibaba.fastjson2.JSON;
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.model.TopicMessage;
import lombok.extern.slf4j.Slf4j;
import net.quanter.shield.common.dto.result.ResultDTO;
import net.quanter.shield.mq.MQMessageVO;
import net.quanter.shield.mq.MQProducer;
import net.quanter.shield.mq.rocketmq.param.RocketMQBorkerParam;
import net.quanter.shield.mq.rocketmq.param.RocketMQTopicParam;
import org.apache.commons.lang3.StringUtils;

/**
 * 商业版本RocketMQ发送者服务
 */
@Slf4j
public class RocketMQHttpProducer<T> implements MQProducer<T, TopicMessage> {

    RocketMQBorkerParam mqConnectVO;
    RocketMQTopicParam topic;
    MQClient mqClient;
    com.aliyun.mq.http.MQProducer httpProducer;

    public RocketMQHttpProducer(RocketMQBorkerParam mqConnectVO, RocketMQTopicParam topic) {
        this.mqConnectVO = mqConnectVO;
        this.topic = topic;
        mqClient = new MQClient(
                mqConnectVO.getEndPoint(),
                mqConnectVO.getAccessId(),
                mqConnectVO.getAccessKey()
        );
        if (StringUtils.isNotBlank(topic.getInstanceId())) {
            httpProducer = mqClient.getProducer(topic.getInstanceId(), topic.getName());
        } else {
            httpProducer = mqClient.getProducer(topic.getName());
        }
    }

    @Override
    public TopicMessage getSourceMessageFromMQMessage(MQMessageVO<T> mqMessageVO) {
        TopicMessage topicMessage = new TopicMessage();
        topicMessage.setMessageId(mqMessageVO.getMessageId());
        topicMessage.setRequestId(mqMessageVO.getRequestId());
        topicMessage.setMessageBody(mqMessageVO.getBase64Obj());
        topicMessage.setMessageTag(mqMessageVO.getTag());
        topicMessage.setShardingKey(mqMessageVO.getShardKey());
        topicMessage.setProperties(mqMessageVO.getProperties());
        return topicMessage;
    }

    @Override
    public ResultDTO send(MQMessageVO mqMessageVO, TopicMessage message) {
        if (httpProducer == null) {
            return errorProducerNull;
        }
        try {
            TopicMessage result = httpProducer.publishMessage(message);
            mqMessageVO.setMessageId(result.getMessageId());
            mqMessageVO.setMessageMD5(result.getMessageBodyMD5());
            if (result.getProperties() != null) {
                mqMessageVO.putAll(result.getProperties());
            }
            mqMessageVO.setRequestId(result.getRequestId());
            return ResultDTO.SUCCESS;
        } catch (Throwable e) {
            log.error("RocketMQHttpProducer send error,mqMessageVO={}",
                    JSON.toJSON(mqMessageVO),
                    e);
            return ResultDTO.failure().message(e.getMessage());
        }
    }

    @Override
    public void close() {
        mqClient.close();
    }
}
